<?php
namespace app\common\mq;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RocketMQ
{
    private $connection;
    private $channel;
    private $exchange;

    public function __construct()
    {
        $host = env('rocket.host', '');
        $port = env('rocket.port', '');
        $user = env('rocket.user', '');
        $password = env('rocket.password', '');
        $this->exchange = env('rocket.exchange', '');
        $type = env('rocket.type', '');
        // 创建连接
        $this->connection = new AMQPStreamConnection($host, $port, $user, $password);
        // 建立通道
        $this->channel = $this->connection->channel();
        // 声明交换机
        $this->channel->exchange_declare($this->exchange, $type);
    }

    /**
     * RocketMQ 发送消息
     * @author 贺强
     * @time   2022/6/20 9:16
     * @param array $parma 发送消息参数
     */
    public function sendMessage($parma)
    {
        // 设定消息路由key
        $routing_keys = ['info', 'waring', 'error'];
        foreach ($routing_keys as $route){
            // 消息内容
            $msg=new AMQPMessage("this is a direct[$route] message");
            // 发送消息到指定的交换机并设置routing key
            $this->channel->basic_publish($msg, $this->exchange, $route);
        }
        // 资源释放
        $this->channel->close();
        $this->connection->close();
    }
}
